{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "50fb8419",
   "metadata": {},
   "source": [
    "# AIS-ETL Examples\n",
    "\n",
    "This notebook shows how to initialize ETLs using the AIStore SDK.\n",
    "\n",
    "For ETL documentation, see [ETL documentation](https://github.com/NVIDIA/aistore/blob/main/docs/etl.md)\n",
    "\n",
    "Sample transformers are available at [ais-etl/transformers](https://github.com/NVIDIA/ais-etl/tree/main/transformers)\n",
    "\n",
    "> **Note:** Install ETL support with:\n",
    "> ```bash\n",
    "> pip install aistore[etl]\n",
    "> ```\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "e6193891",
   "metadata": {},
   "outputs": [],
   "source": [
    "from aistore import Client\n",
    "\n",
    "# Make sure AIStore cluster is running on Kubernetes to use AIS-ETL\n",
    "# Replace <ip> and <port> with the actual IP address and port of your AIStore cluster\n",
    "client = Client(\"http://10.52.160.25:51080\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b5ab67d1",
   "metadata": {},
   "source": [
    "## ETL Webserver Framework\n",
    "\n",
    "**AIS ETL** is language- and framework-agnostic. You can deploy your own custom web server as a transformation pod, supporting both **inline transformations** (real-time GET requests) and **offline batch transformations** (bucket-to-bucket). \n",
    "\n",
    "However, building such a server from scratch involves more than just writing transformation logic. It must also be capable of:\n",
    "\n",
    "* Performing health checks\n",
    "* Communicating with AIStore targets\n",
    "* Parsing [`etl args`](https://github.com/NVIDIA/aistore/blob/main/docs/cli/etl.md#transform-object-with-arguments)—user-defined parameters that control the transformation behavior\n",
    "* Supporting [`direct put`](#direct-put-optimization-faster-bucket-to-bucket-etl-transformation), which allows transformed objects to be directly written to the target bucket without going through the client\n",
    "* Managing HTTP and WebSocket protocols with proper concurrency control\n",
    "\n",
    "Selecting the right web server and communication strategy depends on factors like object size and volume, desired concurrency model, and whether you need a synchronous ([WSGI](https://peps.python.org/pep-3333/)) or asynchronous ([ASGI](https://asgi.readthedocs.io/en/latest/introduction.html)) stack. Each option has its own trade-offs.\n",
    "\n",
    "To simplify this, we’ve introduced **AIS-ETL Web Server Framework** in both **[Go](https://github.com/NVIDIA/aistore/tree/main/ext/etl/webserver#readme)** and **[Python](https://github.com/NVIDIA/aistore/blob/main/python/aistore/sdk/etl/webserver/README.md)**. These SDKs abstract away the boilerplate—so you can build and deploy custom ETL containers in minutes. Focus solely on your transformation logic; the SDK handles everything else, including networking, protocol handling, and high-throughput optimizations."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "b77edb98",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Import ETL webserver classes\n",
    "# See ETL webserver docs: https://github.com/NVIDIA/aistore/tree/main/python/aistore/sdk/etl/webserver\n",
    "from aistore.sdk.etl.webserver.fastapi_server import FastAPIServer\n",
    "from aistore.sdk.etl.webserver.flask_server import FlaskServer\n",
    "from aistore.sdk.etl.webserver.http_multi_threaded_server import HTTPMultiThreadedServer"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f8ea93db",
   "metadata": {},
   "source": [
    "## Initializing ETLs with the Python SDK\n",
    "\n",
    "You can set up an ETL in three ways:\n",
    "\n",
    "1. **`init_class`**  \n",
    "   - For pure-Python transforms.  \n",
    "   - Decorate an `ETLServer` subclass (implementing `transform`) to register it.  \n",
    "   - Best when you only need Python and PyPI dependencies.\n",
    "\n",
    "2. **`init`**  \n",
    "   - Use a container image for your ETL logic.  \n",
    "   - Configure options like `comm_type`, timeouts, commands, etc.  \n",
    "   - Works with built-in transformers or your own image.  \n",
    "   - Also available via the CLI.\n",
    "\n",
    "3. **`init_spec`** (_Advanced Usage_)\n",
    "   - Supply a full Kubernetes Pod spec.  \n",
    "   - Allows advanced tweaks (health checks, init containers, etc.).  \n",
    "   - Kept for backward compatibility or deep customizations.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7694f54a",
   "metadata": {},
   "source": [
    "## Example: Uppercase Transformer (`init_class`)\n",
    "\n",
    "Convert text to uppercase with a Python class."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "bde32313",
   "metadata": {},
   "outputs": [],
   "source": [
    "etl_upper_case = client.etl(\"etl-upper-case\")\n",
    "\n",
    "\n",
    "@etl_upper_case.init_class()\n",
    "class UpperCaseETL(FastAPIServer):\n",
    "    \"\"\"\n",
    "    ETL webserver that converts data to uppercase.\n",
    "    \"\"\"\n",
    "\n",
    "    def transform(self, data, *_args):\n",
    "        return data.upper()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "3a713e12",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Original Object Content:\n",
      "b'Hello ais-etl!'\n",
      "\n",
      "Transformed Object Content (Uppercase):\n",
      "b'HELLO AIS-ETL!'\n"
     ]
    }
   ],
   "source": [
    "# Test the UpperCase ETL\n",
    "\n",
    "# Create a bucket and upload an object to test\n",
    "bucket = client.bucket(\"etl-examples\").create(exist_ok=True)\n",
    "obj = bucket.object(\"test.txt\")\n",
    "# Write content to the object\n",
    "obj.get_writer().put_content(b\"Hello ais-etl!\")\n",
    "\n",
    "print(\"Original Object Content:\")\n",
    "print(obj.get_reader().read_all())\n",
    "\n",
    "from aistore.sdk.etl import ETLConfig\n",
    "\n",
    "print(\"\\nTransformed Object Content (Uppercase):\")\n",
    "print(obj.get_reader(etl=ETLConfig(etl_upper_case.name)).read_all())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0caf2fa4",
   "metadata": {},
   "source": [
    "## Example: XXHash Transformer (`init_class`)\n",
    "\n",
    "Hash bytes with a seed passed per request.\n",
    "\n",
    "This example shows how to use external packages. The XXHash transformer reads ETL arguments from the inline transform request and uses it as a seed value. This shows how to pass custom metadata per object and use it in your ETL pod."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "4793e9ee",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import xxhash\n",
    "\n",
    "hash_etl = client.etl(\"etl-xxhash\")\n",
    "\n",
    "# xxhash is a fast non-cryptographic hash function not in the standard library.\n",
    "# It must be installed separately.\n",
    "\n",
    "# This example uses communication type \"hpull\".\n",
    "# See communication options: https://github.com/NVIDIA/aistore/blob/main/docs/etl.md#communication-mechanisms\n",
    "\n",
    "\n",
    "# We set `SEED_DEFAULT` env var for default seed value in the ETL container.\n",
    "@hash_etl.init_class(comm_type=\"hpull\", dependencies=[\"xxhash\"], SEED_DEFAULT=\"500\")\n",
    "class XXHash(FastAPIServer):\n",
    "    def __init__(self):\n",
    "        super().__init__()\n",
    "        self.default_seed = int(os.getenv(\"SEED_DEFAULT\", \"0\"))\n",
    "\n",
    "    def transform(\n",
    "        self,\n",
    "        data: bytes,\n",
    "        _path: str,\n",
    "        etl_args: str,\n",
    "    ) -> bytes:\n",
    "        seed = int(etl_args) if etl_args else self.default_seed\n",
    "        hasher = xxhash.xxh64(seed=seed)\n",
    "        hasher.update(data)\n",
    "        return hasher.hexdigest().encode(\"ascii\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "6c4614a4",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Original Object Content:\n",
      "b'Hello ais-etl!'\n",
      "\n",
      "Transformed Object Content (XXHash with default seed 500):\n",
      "b'5a1c0264c777ae72'\n",
      "\n",
      "Transformed Object Content (XXHash with seed value 1000):\n",
      "b'337eb4735c14a118'\n"
     ]
    }
   ],
   "source": [
    "# Use the ETL defined above\n",
    "\n",
    "# Read original and transformed object content\n",
    "print(\"Original Object Content:\")\n",
    "print(obj.get_reader().read_all())\n",
    "\n",
    "print(\"\\nTransformed Object Content (XXHash with default seed 500):\")\n",
    "print(obj.get_reader(etl=ETLConfig(hash_etl.name)).read_all())\n",
    "\n",
    "# To use a custom seed (1000), pass it via etl_args:\n",
    "print(\"\\nTransformed Object Content (XXHash with seed value 1000):\")\n",
    "\n",
    "print(obj.get_reader(etl=ETLConfig(hash_etl.name, args=1000)).read_all())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "61df3455",
   "metadata": {},
   "source": [
    "## Example: FFmpeg Transformer (`init`)\n",
    "\n",
    "Run a pre-built FFmpeg container to convert audio to WAV.\n",
    "\n",
    "For this we will be using the images that are already built and ready in [`ais-etl/transformers`](https://github.com/NVIDIA/ais-etl/tree/main/transformers).\n",
    "\n",
    "We will be running the [FFmpeg transformer](https://github.com/NVIDIA/ais-etl/tree/main/transformers/FFmpeg) which is used to transform audio files into WAV format with control over Audio Channels (AC) and Audio Rate (AR)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "bec14e84",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'etl-tbh6yoAuh'"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ffmpeg_etl = client.etl(\"etl-ffmpeg\")\n",
    "\n",
    "ffmpeg_etl.init(image=\"aistorage/transformer_ffmpeg:latest\", AR=\"16000\", AC=\"1\")\n",
    "\n",
    "# To test this, you will need to have a audio file in a bucket.\n",
    "# wav_bytes = client.bucket(\"<audio-files-bck>\").object(\"<audio-file>.<wav/flac/mp3>\").get_reader(etl=ETLConfig(ffmpeg_etl.name)).read_all()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3859731e",
   "metadata": {},
   "source": [
    "## Example: Pod Spec (`init_spec`) — Advanced usage\n",
    "\n",
    "Use a full Kubernetes Pod template only if you need to customize the Pod spec (resources, init containers, labels, etc.)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "5bc1b916",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'etl-WLzByoauh'"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ffmpeg_spec_etl = client.etl(\"etl-ffmpeg-pod-spec\")\n",
    "\n",
    "pod_spec_tmpl = \"\"\"\n",
    "apiVersion: v1\n",
    "kind: Pod\n",
    "metadata:\n",
    "  name: transformer-nemo-ffmpeg\n",
    "  annotations:\n",
    "    # Values it can take [\"hpull://\",\"hpush://\"]\n",
    "    communication_type: \"hpull://\"\n",
    "    wait_timeout: 5m\n",
    "    support_direct_put: \"true\"\n",
    "spec:\n",
    "  containers:\n",
    "    - name: server\n",
    "      image: aistorage/transformer_ffmpeg:latest\n",
    "      imagePullPolicy: Always\n",
    "      ports:\n",
    "        - name: default\n",
    "          containerPort: 8000\n",
    "      # for flask based app\n",
    "      # command: [\"gunicorn\", \"flask_server:flask_app\", \"--bind\", \"0.0.0.0:8000\", \"--workers\", \"4\", \"--log-level\", \"debug\"]\n",
    "      # for http based app\n",
    "      # command: [\"python\", \"http_server.py\"]\n",
    "      # for fastapi based app\n",
    "      command: [\"uvicorn\", \"fastapi_server:fastapi_app\", \"--host\", \"0.0.0.0\", \"--port\", \"8000\", \"--workers\", \"4\", \"--no-access-log\"]\n",
    "      readinessProbe:\n",
    "        httpGet:\n",
    "          path: /health\n",
    "          port: default\n",
    "      env:\n",
    "        - name: AR\n",
    "          value: \"16000\"\n",
    "        - name: AC\n",
    "          value: \"1\"\n",
    "\"\"\"\n",
    "\n",
    "ffmpeg_spec_etl.init_spec(template=pod_spec_tmpl)\n",
    "\n",
    "# To test this, you will need to have a audio file in a bucket.\n",
    "# wav_bytes = client.bucket(\"<audio-files-bck>\").object(\"<audio-file>.<wav/flac/mp3>\").get_reader(etl=ETLConfig(ffmpeg_spec_etl.name)).read_all()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ec3fe411",
   "metadata": {},
   "source": [
    "## ETL Pipelines\n",
    "\n",
    "You can combine multiple ETLs into a pipeline using the `>>` operator. Pipelines allow you to compose transformations and apply them in sequence during a single object read.\n",
    "\n",
    "- Define ETLs as usual (e.g., with `init_class`).\n",
    "- Chain them: `etl_a >> etl_b >> etl_c`.\n",
    "- Use the resulting pipeline with `ETLConfig(etl=pipeline)` for inline transforms.\n",
    "\n",
    "Below are examples of:\n",
    "- Two-stage pipeline\n",
    "- Three-stage pipeline\n",
    "- Chaining a multi-stage pipeline with another ETL using `>>`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "9fa40807",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Register additional ETLs for pipeline demos\n",
    "etl_reverse = client.etl(\"etl-reverse\")\n",
    "\n",
    "@etl_reverse.init_class()\n",
    "class ReverseETL(FastAPIServer):\n",
    "    \"\"\"Reverse byte content.\"\"\"\n",
    "    def transform(self, data: bytes, *_args) -> bytes:\n",
    "        return data[::-1]\n",
    "\n",
    "etl_append = client.etl(\"etl-append-done\")\n",
    "\n",
    "@etl_append.init_class()\n",
    "class AppendETL(FastAPIServer):\n",
    "    \"\"\"Append a marker to the end of the byte content.\"\"\"\n",
    "    def transform(self, data: bytes, *_args) -> bytes:\n",
    "        return data + b\"::DONE\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "c5402c61",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Original:\n",
      "b'Hello ais-etl!'\n",
      "\n",
      "Two-stage pipeline (Uppercase -> Reverse):\n",
      "b'!LTE-SIA OLLEH'\n"
     ]
    }
   ],
   "source": [
    "# Two-stage pipeline: Uppercase -> Reverse\n",
    "two_stage = etl_upper_case >> etl_reverse\n",
    "\n",
    "print(\"Original:\")\n",
    "print(obj.get_reader().read_all())\n",
    "\n",
    "print(\"\\nTwo-stage pipeline (Uppercase -> Reverse):\")\n",
    "print(obj.get_reader(etl=ETLConfig(name=two_stage)).read_all())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "84821ec6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Three-stage pipeline (direct chain Uppercase -> Reverse -> Append):\n",
      "b'!LTE-SIA OLLEH::DONE'\n",
      "\n",
      "Chaining multi-stage with another ETL (two_stage >> etl_append):\n",
      "b'!LTE-SIA OLLEH::DONE'\n"
     ]
    }
   ],
   "source": [
    "# Three-stage pipeline and chaining a multi-stage pipeline with another ETL\n",
    "from aistore.sdk.etl import ETLConfig\n",
    "\n",
    "# Three-stage directly: Uppercase -> Reverse -> Append\n",
    "three_stage_direct = etl_upper_case >> etl_reverse >> etl_append\n",
    "print(\"Three-stage pipeline (direct chain Uppercase -> Reverse -> Append):\")\n",
    "print(obj.get_reader(etl=ETLConfig(name=three_stage_direct)).read_all())\n",
    "\n",
    "# Chaining an existing multi-stage with another ETL using >>\n",
    "full_pipeline = two_stage >> etl_append\n",
    "print(\"\\nChaining multi-stage with another ETL (two_stage >> etl_append):\")\n",
    "print(obj.get_reader(etl=ETLConfig(name=full_pipeline)).read_all())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "6f4b3c0f",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Finally, stop and removethe ETLs\n",
    "etl_upper_case.stop()\n",
    "etl_reverse.stop()\n",
    "etl_append.stop()\n",
    "etl_upper_case.delete()\n",
    "etl_reverse.delete()\n",
    "etl_append.delete()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python_aistore",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
